package com.xl.competition.old.test

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, days, substring, udf}
import org.apache.spark.sql.{Column, SaveMode, SparkSession}

import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Properties}

/**
 * @author: xl
 * @createTime: 2023/11/14 09:19:00
 * @program: com.xl.competition
 */
object InitData {
  def main(args: Array[String]): Unit = {

    val ipaddr: String =
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName(this.getClass.getName)
      .enableHiveSupport()
      .config("hive.metastore.uris", "thrift://" + ipaddr + ":9083")
      .config("spark.sql.parquet.writeLegacyFormat", "true")
      .getOrCreate()

    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "Abc123..")

    val udfs: UserDefinedFunction = udf(() => "20231220")


    val create_time: UserDefinedFunction = udf(()  => {
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val calendar: Calendar = Calendar.getInstance()
      calendar.setTime(new Date())
      calendar.add(Calendar.DATE, 1)
      val str: String = format.format(calendar.getTime)
      return str
    })
    //

    val tableName: String = "user_info"

    spark.sql("""create database if not EXISTS ods""")

    val tableNames: List[String] = List("base_province", "base_region", "cart_info", "order_detail", "order_info", "sku_info", "user_info")
    for (elem <- tableNames) {
      if ("sku_info".equals(elem)) {
        spark.read
          .jdbc("jdbc:mysql://" + ipaddr + ":3306/shtd_store", elem, prop)
          .withColumn("etl_date", udfs())
          .write
          .mode(SaveMode.Append)
          .partitionBy("etl_date")
          .saveAsTable(s"ods.$tableName")
      }
      else if (!elem.equals("base_province") && !elem.equals("base_region")) {
        spark.read
          .jdbc("jdbc:mysql://" + ipaddr + ":3306/shtd_store", elem, prop)
          .withColumn("etl_date", udfs())
          .where(substring(col("create_time"), 9, 2) === "20")
          .write
          .mode(SaveMode.Append)
          .partitionBy("etl_date")
          .saveAsTable(s"ods.$tableName")
      } else {
        spark.read
          .jdbc("jdbc:mysql://" + ipaddr + ":3306/shtd_store", elem, prop)
          .withColumn("etl_date", udfs())
          .withColumn("create_time", create_time())
          .write
          .mode(SaveMode.Append)
          .partitionBy("etl_date")
          .saveAsTable(s"ods.$tableName")
      }

    }
    spark.stop()
  }
}
